多进程创建股票数据

您所在的位置:网站首页 python 多线程慢 多进程创建股票数据

多进程创建股票数据

2024-02-03 09:09| 来源: 网络整理| 查看: 265

前面文章已经记录了股票数据的下载及相关因子计算的过程,本文记录使用多进程创建股票数据的方法。

这里所说的创建数据,包括股票数据的下载和相关因子的计算,会把所有股票的全部历史数据进行下载和计算。后面的文章会介绍如何更新数据,即只处理未创建过的数据。

还有一点说明,这里使用多进程,而非多线程进行数据创建,主要是由于BaoStock不支持多线程下载数据。因此使用Python的多进程模块multithreading进行数据创建。

主要代码分析

新建源文件,命名为data_center_v6.py,全部内容见文末,v6新增3个函数:

新增计算股票代码分组函数 def get_code_group(process_num, stock_codes):

该函数获取股票代码分组,用于多进程计算,每个进程处理一组股票,其中:

参数process_num为进程数参数stock_codes为待处理的股票代码返回值为分组后的股票代码列表,列表的每个元素为一组股票代码的列表 code_group = [[] for i in range(process_num)]

创建空的分组,类型为列表,每个元素为1个空列表,共有process_num个空列表

for index, code in enumerate(stock_codes): code_group[index % process_num].append(code)

按余数为每个分组分配股票。假设我们有10只股票代码依次为0到9,并假设process_num=3,分组后的股票如下所示:

[[0, 3, 6, 9], [1, 4, 7], [2, 5, 8]]

可以看到,我们一共得到process_num=3个分组,位置索引对process_num取余,余数为0、1、2的股票各自被分到1组。

return code_group

返回分组结果。

新增多进程调用函数 def multiprocessing_func(func, args):

该函数用于多进程调用函数,其中:

参数func为子进程要调用的函数参数args为func的参数,类型为元组,第0个元素为进程数,第1个元素为股票代码列表,如果func还有其他参数,则依次往后排列返回值为包含各子进程返回对象的列表

因为程序中涉及较多多进程调用的情况,因此抽象出此函数,方便使用。

results = []

用于保存各子进程返回对象的列表。

with multiprocessing.Pool(processes=args[0]) as pool:

创建进程池。processes表示所用的进程数,这里多数调用均使用默认值为61。当大于61时,在我的虚拟机上会报错(网上资料显示Windows基本都会报这个错误)。在我的虚拟机里运行程序时,把process_num设置为4,CPU占用率已经能达到100%,实测还是process_num值越大,程序执行越快,理论上进程越多,程序越能获得更多的时间片。在我的PC上,把process_num设置为61,CPU占用率也只有20%左右。

for codes in get_code_group(args[0], args[1]): results.append(pool.apply_async(func, args=(codes, *args[2:],)))

对于每个股票分组,使用1个子进程进行计算,func的第0个参数为待处理的股票代码列表,后续参数保存在args[2:]中。调用apply_async方法,进行多进程异步计算。

pool.close()

阻止后续任务提交到进程池。

pool.join()

等待所有进程结束。

return results

返回包含各子进程返回对象的列表。

新增多进程创建数据函数 def create_data_mp(stock_codes, process_num=61, from_date='1990-12-19', to_date=datetime.date.today().strftime('%Y-%m-%d'), adjustflag='2'):

该函数使用多进程创建指定日期内,指定股票的日线数据,计算扩展因子,其中:

参数stock_codes为待创建数据的股票代码参数process_num为进程数参数from_date为日线开始日期参数to_date为日线结束日期参数adjustflag为复权选项,为1时表示后复权,为2时表示前复权,为3时表示不复权,默认为前复权返回值为空 multiprocessing_func(create_data, (process_num, stock_codes, from_date, to_date, adjustflag,))

调用multiprocessing_func进行多进程数据创建,这里会用多进程调用create_data函数,后面元组的第0个元素为进程数,第1个元素为待分组的股票代码列表,后续参数依次为create_data函数所使用的参数。

小结

本文记录了使用多进程创建数据的过程,创建的数据只是用于打印,只要确保程序能正常运行即可,不需要等待程序运行结束。

下一篇文章起,将介绍把数据存入数据库的过程。

data_center_v6.py的全部代码如下:

import baostock as bs import datetime import sys import numpy as np import pandas as pd import multiprocessing # 可用日线数量约束 g_available_days_limit = 250 # BaoStock日线数据字段 g_baostock_data_fields = 'date,open,high,low,close,preclose,volume,amount,adjustflag,turn,tradestatus,pctChg,peTTM,pbMRQ, psTTM,pcfNcfTTM,isST' def get_stock_codes(date=None): """ 获取指定日期的A股代码列表 若参数date为空,则返回最近1个交易日的A股代码列表 若参数date不为空,且为交易日,则返回date当日的A股代码列表 若参数date不为空,但不为交易日,则打印提示非交易日信息,程序退出 :param date: 日期 :return: A股代码的列表 """ # 登录baostock bs.login() # 从BaoStock查询股票数据 stock_df = bs.query_all_stock(date).get_data() # 如果获取数据长度为0,表示日期date非交易日 if 0 == len(stock_df): # 如果设置了参数date,则打印信息提示date为非交易日 if date is not None: print('当前选择日期为非交易日或尚无交易数据,请设置date为历史某交易日日期') sys.exit(0) # 未设置参数date,则向历史查找最近的交易日,当获取股票数据长度非0时,即找到最近交易日 delta = 1 while 0 == len(stock_df): stock_df = bs.query_all_stock(datetime.date.today() - datetime.timedelta(days=delta)).get_data() delta += 1 # 注销登录 bs.logout() # 筛选股票数据,上证和深证股票代码在sh.600000与sz.39900之间 stock_df = stock_df[(stock_df['code'] >= 'sh.600000') & (stock_df['code']


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3